package org.qing.reactive.config.redis;

import java.util.concurrent.Callable;

import org.springframework.cache.Cache;
import org.springframework.data.redis.core.ReactiveRedisTemplate;

import com.alibaba.fastjson.JSON;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@SuppressWarnings({ "unchecked", "rawtypes", "preview" })
public class ReactiveRedisCache implements Cache {

	private String name;

	private ReactiveRedisTemplate<String, Object> nativeCache;

	public ReactiveRedisCache(String name, ReactiveRedisTemplate<String, Object> nativeCache) {
		this.name = name;
		this.nativeCache = nativeCache;
	}

	@Override
	public String getName() {
		log.info("getName()=[{}]", name);
		return name;
	}

	@Override
	public Object getNativeCache() {
		log.info("getNativeCache()=[{}]", nativeCache);
		return nativeCache;
	}

	@Override
	public ValueWrapper get(Object key) {
		log.info("get()=[{}]", key);
		RedisValueWrapper value = new RedisValueWrapper();
		nativeCache.hasKey((String) key).subscribe(hasKey -> {
			if (hasKey) {
				nativeCache.type((String) key).subscribe(type -> {
					if ("string".equals(type.name())) {
						value.setValue(nativeCache.opsForValue().get(key));
					}
					if ("list".equals(type.name())) {
						nativeCache.opsForList().size((String) key).subscribe(
								size -> value.setValue(nativeCache.opsForList().range((String) key, 0, size)));
					}
					if ("hash".equals(type.name())) {
						value.setValue(nativeCache.opsForHash().entries((String) key));
					}
					if ("set".equals(type.name())) {
						value.setValue(nativeCache.opsForSet().scan((String) key));
					}
					if ("zset".equals(type.name())) {
						value.setValue(nativeCache.opsForZSet().scan((String) key));
					}
				});
			}
		});
		return value.get() == null? null : value;
	}

	@Override
	public <T> T get(Object key, Class<T> type) {
		log.info("get()=[key={},type={}]", key, type.getName());
		return (T) get(key);
	}

	@Override
	public <T> T get(Object key, Callable<T> valueLoader) {
		log.info("get()=[key={},valueLoader={}]", key, valueLoader.getClass().getName());
		return (T) get(key);
	}

	@Override
	public void put(Object key, Object value) {
		if (value == null) {
			log.info("put()=[key={},value={}]", key, value);
			return;
		}
		if (value instanceof Flux v) {
			log.info("put()=[key={},flux={}]", key, value instanceof Flux);
			v.subscribe(data -> {
				log.info("put()=[key={},value={}]", key, JSON.toJSONString(data));
				nativeCache.opsForList().rightPush((String) key, data, 60000);
			});
			return;
		}
		if (value instanceof Mono v) {
			log.info("put()=[key={},flux={}]", key, value instanceof Mono);
			v.subscribe(data -> {
				log.info("put()=[key={},value={}]", key, JSON.toJSONString(data));
				nativeCache.opsForValue().set((String) key, data, 60000);
			});
			return;
		}
		log.info("put()=[key={},value={}", key, JSON.toJSONString(value));
		nativeCache.opsForValue().set((String) key, value, 60000);
	}

	@Override
	public void evict(Object key) {
		log.info("evict()=[{}]", key);
		nativeCache.delete(new String[] { (String) key });
	}

	@Override
	public void clear() {
		log.info("clear()");
	}

	@Setter
	class RedisValueWrapper implements ValueWrapper {

		private Object value;

		@Override
		public Object get() {
			return value;
		}

	}
}
